Glue PythonShellジョブからRedshift Spectrumパーティションを更新する
GlueのPythonShellを使って Redshift Spectrumで参照するS3パスにファイルを追加していくというジョブの作成をするという場面がありました。 Spectrumは基本的に容量が多いデータが対象になると思いますので、 データをS3に出力したらパーティション更新のクエリ実行もほぼ必須となります。 今回はこのクエリを実行する方法を調べました。
PyGreSQLでの実行を試す
PythonShellからRedshiftでSQLクエリを実行するにあたってはPyGreSQLがデフォルトで使えますので、 それで実装をすれば良いと思っていました。 具体的には以下のような形です。
import pgdb query = ''' alter table spectrum_schema.table_name add if not exists partition(date='{0}') location 's3://bucket_name/spectrum/date={0}/'; '''.format(date) conn = pgdb.connect(host=xxx, database=xxx, user=xxx, password=xxx) cur = conn.cursor() cur.execute(query) conn.commit()
しかし、これを実行しようとするとこんな感じのエラーが出ました。
INSERT INTO EXTERNAL TABLE cannot run inside a transaction block
このエラーコード自体はEXTERNAL TABLEにINSERT INTOしようとした際の文言ですが、ほぼ同等のものが出ました。 着目すべきは後半で、トランザクションの中では実行できないとのことです。 SpectrumはRedshiftだけに閉じた話ではないので、トランザクションを使えないというのは特に違和感はないですね。
トランザクション内で実行できないので、クエリをオートコミットで実行することで解決するはずです。
PyGreSQLのマニュアルをみるとConnection.autocommit
というプロパティがあり、これを使えばOKそうです。
・・・が、結果的にPythonShellでは、これではダメでした。
というのも、このautocommit
はNew in version 5.1
。
一方PythonShellでpygresql
のバージョンを調べてみると5.0.6
でした。
import pkg_resources print(pkg_resources.get_distribution("pygresql").version) # => 5.0.6
ということでPyGreSQLを使ってのオートコミットは諦めることにしました。
解決方法
PyGreSQLが使えないので、潔くpsycopg2を使うことにしました。 GlueジョブはS3においたwhlファイルを読み込ませることができるので、その準備をします。 psycopg2-binaryは、whlファイルがPyPIにて公開されているので、これを持ってくるのが一番簡単そうです。 以下の場所からダウンロードできます。
https://pypi.org/project/psycopg2-binary/#files
PythonShellで使用する場合は下記の名前のファイルが適合します。 Pythonのバージョンは今の所3.6で固定ですので、そこもご注意を。
psycopg2_binary-2.8.6-cp36-cp36m-manylinux1_x86_64.whl
これを適当なS3パスでアップロードします。 そしてアップロードしたwhlファイルが読み込めるように、 実行時には「Python ライブラリパス」としてS3パスを指定します。
これでimport psycopg2
ができるようになります。
サンプルコードはこんな感じです。
import psycopg2 query = ''' alter table spectrum_schema.table_name add if not exists partition(date='{0}') location 's3://bucket_name/spectrum/date={0}/'; '''.format(date) conn = psycopg2.connect(url) conn.set_isolation_level(0) with conn.cursor() as cursor: cursor.execute(query)
set_isolation_level(0)
を指定することでオートコミットでクエリが実行されます。
これでパーティションの更新ができました。
まとめ
GlueのPythonShellでオートコミットを有効にしたクエリの実行を行いました。 中途半端にPyGreSQLがデフォルトで使える状況だったので苦戦してしまいましたが、 単純にpsycopg2を使うことで実現できました。
Spectrumに限らず、Athenaでも同様の状況になることが予想できますが、 同じ方法で解決できるものと思います。